文档

Stream模型

更新时间:

本文介绍Stream模型的功能及使用说明。

背景信息

Stream模型提供子任务生产和消费处理分离的可持续生产消费模式。Produce接口按指定频率持续运行生成一批次子任务信息进入队列,基于缓冲队列框架持续分发子任务给当前业务集群处理。处理分发过程不会等待上一批次执行结束,只要集群有可用机器,资源就会持续分发处理。

image.png

该模型主要解决以下问题场景:

  • 现有的Map模型每次运行都需要等待所有子任务执行完,单个子任务耗时影响整体的后续运行。

  • 对子任务的生产速率和分发处理提供全局的并发控制。

  • 调度服务端提供任务运行期的监控报警。

前提条件

使用限制

客户端存在版本限制,具体版本请在以下界面查看:

  1. 登录分布式任务调度平台,在左侧导航栏,单击任务管理

  2. 任务管理页面,单击创建任务

  3. 创建任务面板,鼠标悬停在执行模式右侧的image.png图标处查看客户端版本要求。

开发接口

业务开发需要实现接口:com.alibaba.schedulerx.worker.processor.StreamJobProcessor

接口

说明

是否必选

public abstract List<Object> produce(JobContext context);

子任务生产方法。

public ProcessResult process(JobContext context) throws Exception;

子任务处理方法。

public ProcessResult reduce(JobContext context);

每批次子任务Reduce方法。

public boolean needReduce();

是否开启Reduce。

高级配置

  1. 登录分布式任务调度平台,在左侧导航栏,单击任务管理

  2. 任务管理页面,单击创建任务

  3. 创建任务面板,执行模式下拉列表选择Stream,在高级配置区域配置相关信息。

    image.png

    配置项

    说明

    分发策略

    指子任务在业务集群中采用的分发模式。可选项:轮询策略WorkerLoad最优策略

    生产频率

    用于控制Produce方法的循环执行间隔,默认3秒。可根据业务的消费能力适度调整。

    子任务失败重试次数

    默认为0,子任务失败会自动重试。

    子任务失败重试间隔

    子任务失败重试间隔,单位:秒。默认为0。

    子任务队列容量

    生产临时存放的子任务队列。如果队列满了则会阻塞Produce的生产防止消费处理端积压。

    子任务全局并发数

    流式模型下支持子任务全局并发数功能,该功能可以进行限流,同时配合子任务队列容量控制Produce,默认1000。

    限流示例场景:例如下游有性能瓶颈,上千台机器每台机器限制单机子任务并发数为1,还是承受不了大量访问。

实践案例

在您的开发软件(例如IntelliJ IDEA)中,参考以下示例修改应用工程代码。

public class SimpleStreamProcess extends StreamJobProcessor {

    private int index = 0;

    @Override
    public boolean needReduce() {
        return true;
    }

    @Override
    public List<Object> produce(JobContext context) {
        if (index++ < Integer.MAX_VALUE) {
            return Lists.newArrayList(index+"-"+1, index+"-"+2, index+"-"+3, index+"-"+4);
        } else {
            return null;
        }
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        Object task = context.getTask();
        // 加载子任务进行业务处理
        System.out.println(task);
        TimeUnit.SECONDS.sleep(1);
        return new ProcessResult(true);
    }

    @Override
    public ProcessResult reduce(JobContext context) {
        // 获取当前批次的子任务处理结果信息
        System.out.println("BatchNo:"+ context.getSerialNum()+ "  TaskSize:"+context.getTaskStatuses());
        return new ProcessResult(true);
    }
}

运行查看

查看执行记录列表,存在一条持续运行中的实例记录。您可在当前执行情况页签,查看当前运行中的每批次执行情况,以及最近10批次历史记录。任务配置失败和超时报警时,根据每个批次是否成功或超时进行相应的报警通知。

image.png

  • 本页导读 (1)